---
sidebar_label: Data Subscription
description: "The TDengine data subscription service automatically pushes data written in TDengine to subscribing clients."
title: Data Subscription
---

import Tabs from "@theme/Tabs";
import TabItem from "@theme/TabItem";
import Java from "./_sub_java.mdx";
import JavaWS from "./_sub_java_ws.mdx"
import Python from "./_sub_python.mdx";
import Go from "./_sub_go.mdx";
import Rust from "./_sub_rust.mdx";
import Node from "./_sub_node.mdx";
import CSharp from "./_sub_cs.mdx";
import CDemo from "./_sub_c.mdx";

TDengine provides data subscription and consumption interfaces similar to message queue products. These interfaces make it easier for applications to obtain data written to TDengine either in real time and to process data in the order that events occurred. This simplifies your time-series data processing systems and reduces your costs because it is no longer necessary to deploy a message queue product such as Kafka.

To use TDengine data subscription, you define topics like in Kafka. However, a topic in TDengine is based on query conditions for an existing supertable, table, or subtable - in other words, a SELECT statement. You can use SQL to filter data by tag, table name, column, or expression and then perform a scalar function or user-defined function on the data. Aggregate functions are not supported. This gives TDengine data subscription more flexibility than similar products. The granularity of data can be controlled on demand by applications, while filtering and preprocessing are handled by TDengine instead of the application layer. This implementation reduces the amount of data transmitted and the complexity of applications.

By subscribing to a topic, a consumer can obtain the latest data in that topic in real time. Multiple consumers can be formed into a consumer group that consumes messages together. Consumer groups enable faster speed through multi-threaded, distributed data consumption. Note that consumers in different groups that are subscribed to the same topic do not consume messages together. A single consumer can subscribe to multiple topics. If the data in a supertable is sharded across multiple vnodes, consumer groups can consume it much more efficiently than single consumers. TDengine also includes an acknowledgement mechanism that ensures at-least-once delivery in complicated environments where machines may crash or restart.

To implement these features, TDengine indexes its write-ahead log (WAL) file for fast random access and provides configurable methods for replacing and retaining this file. You can define a retention period and size for this file. For information, see the CREATE DATABASE statement. In this way, the WAL file is transformed into a persistent storage engine that remembers the order in which events occur. However, note that configuring an overly long retention period for your WAL files makes database compression inefficient. TDengine then uses the WAL file instead of the time-series database as its storage engine for queries in the form of topics. TDengine reads the data from the WAL file; uses a unified query engine instance to perform filtering, transformations, and other operations; and finally pushes the data to consumers.

Tips:(c interface for example)
- A consumption group consumes all data under the same topic, and different consumption groups are independent of each other;
- A consumption group consumes all vgroups of the same topic, which can be composed of multiple consumers, but a vgroup is only consumed by one consumer. If the number of consumers exceeds the number of vgroups, the excess consumers do not consume data;
- On the server side, only one offset is saved for each vgroup, and the offsets for each vgroup are monotonically increasing, but not necessarily continuous. There is no correlation between the offsets of various vgroups;
- Each poll server will return a result block, which belongs to a vgroup and may contain data from multiple versions of wal. This block can be accessed through tmq_get_vgroup_offset. The offset interface obtains the offset of the first record in the block;
- If a consumer group has never committed an offset, when its member consumers restart and pull data again, they start consuming from the set value of the parameter auto.offset.reset; In a consumer lifecycle, the client locally records the offset of the most recent pull data and will not pull duplicate data;
- If a consumer terminates abnormally (without calling tmq_close), they need to wait for about 12 seconds to trigger their consumer group rebalance. The consumer's status on the server will change to LOST, and after about 1 day, the consumer will be automatically deleted; Exit normally, and after exiting, the consumer will be deleted; Add a new consumer, wait for about 2 seconds to trigger Rebalance, and the consumer's status on the server will change to ready;
- The consumer group Rebalance will reassign Vgroups to all consumer members in the ready state of the group, and consumers can only assign/see/commit/poll operations to the Vgroups they are responsible for;
- Consumers can tmq_position to obtain the offset of the current consumption, seek to the specified offset, and consume again;
- Seek points the position to the specified offset without executing the commit operation. Once the seek is successful, it can poll the specified offset and subsequent data;
- Position is to obtain the current consumption position, which is the position to be taken next time, not the current consumption position
- Commit is the submission of the consumption location. Without parameters, it is the submission of the current consumption location (the location to be taken next time, not the current consumption location). With parameters, it is the location in the submission parameters (i.e. the location to be taken after the next exit and restart)
- Seek is to set the consumer's consumption position. Wherever the seek goes, the position will be returned, all of which are the positions to be taken next time
- Seek does not affect commit, commit does not affect seek, independent of each other, the two are different concepts
- The begin interface is the offset of the first data in wal, and the end interface is the offset+1 of the last data in wal10.
- Before the seek operation, tmq must be call tmq_get_topic_assignment, The assignment interface obtains the vgroup ID and offset range of the consumer. The seek operation will detect whether the vgroup ID and offset are legal, and if they are illegal, an error will be reported;
- Due to the existence of a WAL expiration deletion mechanism, even if the seek operation is successful, it is possible that the offset has expired when polling data. If the offset of poll is less than the WAL minimum version number, it will be consumed from the WAL minimum version number;
- The tmq_get_vgroup_offset interface obtains the offset of the first data in the result block where the record is located. When seeking to this offset, it will consume all the data in this block. Refer to point four;
- Data subscription is to consume data from the wal. If some wal files are deleted according to WAL retention policy, the deleted data can't be consumed any more. So you need to set a reasonable value for parameter `WAL_RETENTION_PERIOD` or `WAL_RETENTION_SIZE` when creating the database and make sure your application consume the data in a timely way to make sure there is no data loss. This behavior is similar to Kafka and other widely used message queue products.

## Data Schema and API

The related schemas and APIs in various languages are described as follows:

<Tabs defaultValue="java" groupId="lang">
<TabItem value="c" label="C">

```c
    typedef struct tmq_t      tmq_t;
    typedef struct tmq_conf_t tmq_conf_t;
    typedef struct tmq_list_t tmq_list_t;

    typedef void(tmq_commit_cb(tmq_t *tmq, int32_t code, void *param));

    typedef enum tmq_conf_res_t {
        TMQ_CONF_UNKNOWN = -2,
        TMQ_CONF_INVALID = -1,
        TMQ_CONF_OK = 0,
    } tmq_conf_res_t;

    typedef struct tmq_topic_assignment {
        int32_t vgId;
        int64_t currentOffset;
        int64_t begin;
        int64_t end;  // The last version of wal + 1
    } tmq_topic_assignment;

    DLL_EXPORT tmq_conf_t    *tmq_conf_new();
    DLL_EXPORT tmq_conf_res_t tmq_conf_set(tmq_conf_t *conf, const char *key, const char *value);
    DLL_EXPORT void           tmq_conf_destroy(tmq_conf_t *conf);
    DLL_EXPORT void           tmq_conf_set_auto_commit_cb(tmq_conf_t *conf, tmq_commit_cb *cb, void *param);

    DLL_EXPORT tmq_list_t *tmq_list_new();
    DLL_EXPORT int32_t     tmq_list_append(tmq_list_t *, const char *);
    DLL_EXPORT void        tmq_list_destroy(tmq_list_t *);
    DLL_EXPORT int32_t     tmq_list_get_size(const tmq_list_t *);
    DLL_EXPORT char      **tmq_list_to_c_array(const tmq_list_t *);

    DLL_EXPORT tmq_t    *tmq_consumer_new(tmq_conf_t *conf, char *errstr, int32_t errstrLen);
    DLL_EXPORT int32_t   tmq_subscribe(tmq_t *tmq, const tmq_list_t *topic_list);
    DLL_EXPORT int32_t   tmq_unsubscribe(tmq_t *tmq);
    DLL_EXPORT int32_t   tmq_subscription(tmq_t *tmq, tmq_list_t **topics);
    DLL_EXPORT TAOS_RES *tmq_consumer_poll(tmq_t *tmq, int64_t timeout);
    DLL_EXPORT int32_t   tmq_consumer_close(tmq_t *tmq);
    DLL_EXPORT int32_t   tmq_commit_sync(tmq_t *tmq, const TAOS_RES *msg); //Commit the msg’s offset + 1
    DLL_EXPORT void      tmq_commit_async(tmq_t *tmq, const TAOS_RES *msg, tmq_commit_cb *cb, void *param);
    DLL_EXPORT int32_t   tmq_commit_offset_sync(tmq_t *tmq, const char *pTopicName, int32_t vgId, int64_t offset);
    DLL_EXPORT void      tmq_commit_offset_async(tmq_t *tmq, const char *pTopicName, int32_t vgId, int64_t offset, tmq_commit_cb *cb, void *param);
    DLL_EXPORT int32_t   tmq_get_topic_assignment(tmq_t *tmq, const char *pTopicName, tmq_topic_assignment **assignment,int32_t *numOfAssignment);
    DLL_EXPORT void      tmq_free_assignment(tmq_topic_assignment* pAssignment);
    DLL_EXPORT int32_t   tmq_offset_seek(tmq_t *tmq, const char *pTopicName, int32_t vgId, int64_t offset);
    DLL_EXPORT int64_t   tmq_position(tmq_t *tmq, const char *pTopicName, int32_t vgId);  // The current offset is the offset of the last consumed message + 1
    DLL_EXPORT int64_t   tmq_committed(tmq_t *tmq, const char *pTopicName, int32_t vgId);

    DLL_EXPORT const char *tmq_get_topic_name(TAOS_RES *res);
    DLL_EXPORT const char *tmq_get_db_name(TAOS_RES *res);
    DLL_EXPORT int32_t     tmq_get_vgroup_id(TAOS_RES *res);
    DLL_EXPORT int64_t     tmq_get_vgroup_offset(TAOS_RES* res);  // Get current offset of the result
    DLL_EXPORT const char *tmq_err2str(int32_t code);
```

For more information, see [C/C++ Connector](/reference/connector/cpp).

The following example is based on the smart meter table described in Data Models. For complete sample code, see the C language section below.

</TabItem>
<TabItem value="java" label="Java">

```java
void subscribe(Collection<String> topics) throws SQLException;

void unsubscribe() throws SQLException;

Set<String> subscription() throws SQLException;

ConsumerRecords<V> poll(Duration timeout) throws SQLException;

void commitSync() throws SQLException;

void close() throws SQLException;
```

</TabItem>

<TabItem value="Python" label="Python">

```python
class Consumer:
    def subscribe(self, topics):
        pass

    def unsubscribe(self):
        pass

    def poll(self, timeout: float = 1.0):
        pass

    def assignment(self):
        pass

    def poll(self, timeout: float = 1.0):
        pass

    def close(self):
        pass

    def commit(self, message):
        pass
```

</TabItem>

<TabItem label="Go" value="Go">

```go
func NewConsumer(conf *tmq.ConfigMap) (*Consumer, error)

// rebalanceCb is reserved for compatibility purpose
func (c *Consumer) Subscribe(topic string, rebalanceCb RebalanceCb) error

// rebalanceCb is reserved for compatibility purpose
func (c *Consumer) SubscribeTopics(topics []string, rebalanceCb RebalanceCb) error

func (c *Consumer) Poll(timeoutMs int) tmq.Event

// tmq.TopicPartition is reserved for compatibility purpose
func (c *Consumer) Commit() ([]tmq.TopicPartition, error)

func (c *Consumer) Unsubscribe() error

func (c *Consumer) Close() error
```

</TabItem>

<TabItem label="Rust" value="Rust">

```rust
impl TBuilder for TmqBuilder
  fn from_dsn<D: IntoDsn>(dsn: D) -> Result<Self, Self::Error>
  fn build(&self) -> Result<Self::Target, Self::Error>

impl AsAsyncConsumer for Consumer
  async fn subscribe<T: Into<String>, I: IntoIterator<Item = T> + Send>(
        &mut self,
        topics: I,
    ) -> Result<(), Self::Error>;
  fn stream(
        &self,
    ) -> Pin<
        Box<
            dyn '_
                + Send
                + futures::Stream<
                    Item = Result<(Self::Offset, MessageSet<Self::Meta, Self::Data>), Self::Error>,
                >,
        >,
    >;
  async fn commit(&self, offset: Self::Offset) -> Result<(), Self::Error>;

  async fn unsubscribe(self);
```

For more information, see [Crate taos](https://docs.rs/taos).

</TabItem>

<TabItem label="Node.JS" value="Node.JS">

```js
function TMQConsumer(config)

function subscribe(topic)

function consume(timeout)

function subscription()

function unsubscribe()

function commit(msg)

function close()
```

</TabItem>

<TabItem value="C#" label="C#">

```csharp
ConsumerBuilder(IEnumerable<KeyValuePair<string, string>> config)

virtual IConsumer Build()

Consumer(ConsumerBuilder builder)

void Subscribe(IEnumerable<string> topics)

void Subscribe(string topic) 

ConsumeResult Consume(int millisecondsTimeout)

List<string> Subscription()

void Unsubscribe()
 
void Commit(ConsumeResult consumerResult)

void Close()
```

</TabItem>
</Tabs>

## Insert Data into TDengine

A database including one supertable and two subtables is created as follows:

```sql
DROP DATABASE IF EXISTS tmqdb;
CREATE DATABASE tmqdb WAL_RETENTION_PERIOD 3600;
CREATE TABLE tmqdb.stb (ts TIMESTAMP, c1 INT, c2 FLOAT, c3 VARCHAR(16)) TAGS(t1 INT, t3 VARCHAR(16));
CREATE TABLE tmqdb.ctb0 USING tmqdb.stb TAGS(0, "subtable0");
CREATE TABLE tmqdb.ctb1 USING tmqdb.stb TAGS(1, "subtable1");       
INSERT INTO tmqdb.ctb0 VALUES(now, 0, 0, 'a0')(now+1s, 0, 0, 'a00');
INSERT INTO tmqdb.ctb1 VALUES(now, 1, 1, 'a1')(now+1s, 11, 11, 'a11');
```

## Create a Topic

The following SQL statement creates a topic in TDengine:

```sql
CREATE TOPIC topic_name AS SELECT ts, c1, c2, c3 FROM tmqdb.stb WHERE c1 > 1;
```

- There is an upper limit to the number of topics created, controlled by the parameter tmqMaxTopicNum, with a default of 20

Multiple subscription types are supported.

#### Subscribe to a Column

Syntax:

```sql
CREATE TOPIC topic_name as subquery
```

You can subscribe to a topic through a SELECT statement. Statements that specify columns, such as `SELECT *` and `SELECT ts, cl` are supported, as are filtering conditions and scalar functions. Aggregate functions and time window aggregation are not supported. Note:

- The schema of topics created in this manner is determined by the subscribed data.
- You cannot modify (`ALTER <table> MODIFY`) or delete (`ALTER <table> DROP`) columns or tags that are used in a subscription or calculation.
- Columns added to a table after the subscription is created are not displayed in the results. Deleting columns will cause an error.

### Subscribe to a Supertable

Syntax:

```sql
CREATE TOPIC topic_name [with meta] AS STABLE stb_name [where_condition]
```

Creating a topic in this manner differs from a `SELECT * from stbName` statement as follows:

- The table schema can be modified.
- Unstructured data is returned. The format of the data returned changes based on the supertable schema.
- The 'with meta' parameter is optional. When selected, statements such as creating super tables and sub tables will be returned, mainly used for Taosx to perform super table migration
- The 'where_condition' parameter is optional and will be used to filter and subscribe to sub tables that meet the criteria. Where conditions cannot have ordinary columns, only tags or tbnames. Functions can be used in where conditions to filter tags, but cannot be aggregate functions because sub table tag values cannot be aggregated. It can also be a constant expression, such as 2>1 (subscribing to all child tables), Or false (subscribe to 0 sub tables)
- The data returned does not include tags.

### Subscribe to a Database

Syntax:

```sql
CREATE TOPIC topic_name [with meta] AS DATABASE db_name;
```

This SQL statement creates a subscription to all tables in the database.

- The 'with meta' parameter is optional. When selected, it will return statements for creating all super tables and sub tables in the database, mainly used for Taosx database migration

## Create a Consumer

You configure the following parameters when creating a consumer:

|            Parameter            |  Type   | Description                                                 | Remarks                                        |
| :----------------------------: | :-----: | -------------------------------------------------------- | ------------------------------------------- |
|        `td.connect.ip`         | string  | IP address of the server side                          |  |
|        `td.connect.user`         | string  | User Name                          |  |
|        `td.connect.pass`         | string  | Password                        |   |
|        `td.connect.port`         | string  | Port of the server side            |   |
|           `group.id`           | string  | Consumer group ID; consumers with the same ID are in the same group                        | **Required**. Maximum length: 192. Each topic can create up to 100 consumer groups.                 |
|          `client.id`           | string  | Client ID                                                | Maximum length: 192.                             |
|      `auto.offset.reset`       |  enum   | Initial offset for the consumer group                                     | `earliest`: subscribe from the earliest data, this is the default behavior; `latest`: subscribe from the latest data; or `none`: can't subscribe without committed offset|
|      `enable.auto.commit`      | boolean | Commit automatically; true: user application doesn't need to explicitly commit; false: user application need to handle commit by itself                                           | Default value is true                  |
|   `auto.commit.interval.ms`    | integer | Interval for automatic commits, in milliseconds                           |
|     `msg.with.table.name`      | boolean | Specify whether to deserialize table names from messages                                 | default value: false

The method of specifying these parameters depends on the language used:

<Tabs defaultValue="java" groupId="lang">
<TabItem value="c" label="C">

```c
/* Create consumer groups on demand (group.id) and enable automatic commits (enable.auto.commit),
   an automatic commit interval (auto.commit.interval.ms), and a username (td.connect.user) and password (td.connect.pass) */
tmq_conf_t* conf = tmq_conf_new();
tmq_conf_set(conf, "enable.auto.commit", "true");
tmq_conf_set(conf, "auto.commit.interval.ms", "1000");
tmq_conf_set(conf, "group.id", "cgrpName");
tmq_conf_set(conf, "td.connect.user", "root");
tmq_conf_set(conf, "td.connect.pass", "taosdata");
tmq_conf_set(conf, "auto.offset.reset", "earliest");
tmq_conf_set(conf, "msg.with.table.name", "true");
tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL);

tmq_t* tmq = tmq_consumer_new(conf, NULL, 0);
tmq_conf_destroy(conf);
```

</TabItem>
<TabItem value="java" label="Java">

Java programs use the following parameters:

|            Parameter            |  Type   | Description                                                 | Remarks                                        |
| ----------------------------- | ------ | ----------------------------------------------------------------------------------------------------------------------------- |
| `td.connect.type` | string | connection type: "jni" means native connection, "ws" means websocket connection, the default is "jni" |
| `bootstrap.servers`           | string |Connection address, such as `localhost:6030`                                                                                                 |
| `value.deserializer`          | string | Value deserializer; to use this method, implement the `com.taosdata.jdbc.tmq.Deserializer` interface or inherit the `com.taosdata.jdbc.tmq.ReferenceDeserializer` type |
| `value.deserializer.encoding` | string | Specify the encoding for string deserialization                                                                                                        |  |

Note: The `bootstrap.servers` parameter is used instead of `td.connect.ip` and `td.connect.port` to provide an interface that is consistent with Kafka.

```java
Properties properties = new Properties();
properties.setProperty("enable.auto.commit", "true");
properties.setProperty("auto.commit.interval.ms", "1000");
properties.setProperty("group.id", "cgrpName");
properties.setProperty("bootstrap.servers", "127.0.0.1:6030");
properties.setProperty("td.connect.user", "root");
properties.setProperty("td.connect.pass", "taosdata");
properties.setProperty("auto.offset.reset", "earliest");
properties.setProperty("msg.with.table.name", "true");
properties.setProperty("value.deserializer", "com.taos.example.MetersDeserializer");

TaosConsumer<Meters> consumer = new TaosConsumer<>(properties);

/* value deserializer definition. */
import com.taosdata.jdbc.tmq.ReferenceDeserializer;

public class MetersDeserializer extends ReferenceDeserializer<Meters> {
}
```

</TabItem>

<TabItem label="Go" value="Go">

```go
conf := &tmq.ConfigMap{
 "group.id":                     "test",
 "auto.offset.reset":            "earliest",
 "td.connect.ip":                "127.0.0.1",
 "td.connect.user":              "root",
 "td.connect.pass":              "taosdata",
 "td.connect.port":              "6030",
 "client.id":                    "test_tmq_c",
 "enable.auto.commit":           "false",
 "msg.with.table.name":          "true",
}
consumer, err := NewConsumer(conf)
```

</TabItem>

<TabItem label="Rust" value="Rust">

```rust
let mut dsn: Dsn = "taos://".parse()?;
dsn.set("group.id", "group1");
dsn.set("client.id", "test");
dsn.set("auto.offset.reset", "earliest");

let tmq = TmqBuilder::from_dsn(dsn)?;

let mut consumer = tmq.build()?;
```

</TabItem>

<TabItem value="Python" label="Python">

```python
from taos.tmq import Consumer

# Syntax: `consumer = Consumer(configs)`
#
# Example:
consumer = Consumer({"group.id": "local", "td.connect.ip": "127.0.0.1"})
```

</TabItem>

<TabItem label="Node.JS" value="Node.JS">

```js
// Create consumer groups on demand (group.id) and enable automatic commits (enable.auto.commit),
// an automatic commit interval (auto.commit.interval.ms), and a username (td.connect.user) and password (td.connect.pass) 

let consumer = taos.consumer({
  'enable.auto.commit': 'true',
  'auto.commit.interval.ms','1000',
  'group.id': 'tg2',
  'td.connect.user': 'root',
  'td.connect.pass': 'taosdata',
  'auto.offset.reset','earliest',
  'msg.with.table.name': 'true',
  'td.connect.ip','127.0.0.1',
  'td.connect.port','6030'  
  });
```

</TabItem>

<TabItem value="C#" label="C#">

```csharp
using TDengineTMQ;

// Create consumer groups on demand (GourpID) and enable automatic commits (EnableAutoCommit),
// an automatic commit interval (AutoCommitIntervalMs), and a username (TDConnectUser) and password (TDConnectPasswd)
var cfg = new ConsumerConfig
 {
    EnableAutoCommit = "true"
    AutoCommitIntervalMs = "1000"
    GourpId = "TDengine-TMQ-C#",
    TDConnectUser = "root",
    TDConnectPasswd = "taosdata",
    AutoOffsetReset = "earliest"
    MsgWithTableName = "true",
    TDConnectIp = "127.0.0.1",
    TDConnectPort = "6030"
 };

var consumer = new ConsumerBuilder(cfg).Build();

```

</TabItem>

</Tabs>

A consumer group is automatically created when multiple consumers are configured with the same consumer group ID.

## Subscribe to a Topic

A single consumer can subscribe to multiple topics.

<Tabs defaultValue="java" groupId="lang">
<TabItem value="c" label="C">

```c
// Create a list of subscribed topics
tmq_list_t* topicList = tmq_list_new();
tmq_list_append(topicList, "topicName");
// Enable subscription
tmq_subscribe(tmq, topicList);
tmq_list_destroy(topicList);
  
```

</TabItem>
<TabItem value="java" label="Java">

```java
List<String> topics = new ArrayList<>();
topics.add("tmq_topic");
consumer.subscribe(topics);
```

</TabItem>
<TabItem value="Go" label="Go">

```go
err = consumer.Subscribe("example_tmq_topic", nil)
if err != nil {
 panic(err)
}
```

</TabItem>
<TabItem value="Rust" label="Rust">

```rust
consumer.subscribe(["tmq_meters"]).await?;
```

</TabItem>

<TabItem value="Python" label="Python">

```python
consumer.subscribe(['topic1', 'topic2'])
```

</TabItem>

<TabItem label="Node.JS" value="Node.JS">

```js
// Create a list of subscribed topics
let topics = ['topic_test']

// Enable subscription
consumer.subscribe(topics);
```

</TabItem>

<TabItem value="C#" label="C#">

```csharp
// Create a list of subscribed topics
List<String> topics = new List<string>();
topics.add("tmq_topic");
// Enable subscription
consumer.Subscribe(topics);
```

</TabItem>

</Tabs>

## Consume messages

The following code demonstrates how to consume the messages in a queue.

<Tabs defaultValue="java" groupId="lang">
<TabItem value="c" label="C">

```c
## Consume data
while (running) {
  TAOS_RES* msg = tmq_consumer_poll(tmq, timeOut);
  msg_process(msg);
}  
```

The `while` loop obtains a message each time it calls `tmq_consumer_poll()`. This message is exactly the same as the result returned by a query, and the same deserialization API can be used on it.

</TabItem>
<TabItem value="java" label="Java">

```java
while(running){
  ConsumerRecords<Meters> meters = consumer.poll(Duration.ofMillis(100));
    for (Meters meter : meters) {
      processMsg(meter);
    }    
}
```

</TabItem>

<TabItem value="Go" label="Go">

```go
for {
 ev := consumer.Poll(0)
 if ev != nil {
  switch e := ev.(type) {
  case *tmqcommon.DataMessage:
   fmt.Println(e.Value())
  case tmqcommon.Error:
   fmt.Fprintf(os.Stderr, "%% Error: %v: %v\n", e.Code(), e)
   panic(e)
  }
  consumer.Commit()
 }
}
```

</TabItem>

<TabItem value="Rust" label="Rust">

```rust
{
    let mut stream = consumer.stream();

    while let Some((offset, message)) = stream.try_next().await? {
        // get information from offset

        // the topic
        let topic = offset.topic();
        // the vgroup id, like partition id in kafka.
        let vgroup_id = offset.vgroup_id();
        println!("* in vgroup id {vgroup_id} of topic {topic}\n");

        if let Some(data) = message.into_data() {
            while let Some(block) = data.fetch_raw_block().await? {
                // one block for one table, get table name if needed
                let name = block.table_name();
                let records: Vec<Record> = block.deserialize().try_collect()?;
                println!(
                    "** table: {}, got {} records: {:#?}\n",
                    name.unwrap(),
                    records.len(),
                    records
                );
            }
        }
        consumer.commit(offset).await?;
    }
}
```

</TabItem>
<TabItem value="Python" label="Python">

```python
while True:
    res = consumer.poll(100)
    if not res:
        continue
    err = res.error()
    if err is not None:
        raise err
    val = res.value()

    for block in val:
        print(block.fetchall())
```

</TabItem>

<TabItem label="Node.JS" value="Node.JS">

```js
while(true){
  msg = consumer.consume(200);
  // process message(consumeResult)
  console.log(msg.topicPartition);
  console.log(msg.block);
  console.log(msg.fields)
}
```

</TabItem>

<TabItem value="C#" label="C#">

```csharp
## Consume data
while (true)
{
    var consumerRes = consumer.Consume(100);
    // process ConsumeResult
    ProcessMsg(consumerRes);
    consumer.Commit(consumerRes);
}
```

</TabItem>

</Tabs>

## Close the consumer

After message consumption is finished, the consumer is unsubscribed.

<Tabs defaultValue="java" groupId="lang">
<TabItem value="c" label="C">

```c
/* Unsubscribe */
tmq_unsubscribe(tmq);

/* Close consumer object */
tmq_consumer_close(tmq);
```

</TabItem>
<TabItem value="java" label="Java">

```java
/* Unsubscribe */
consumer.unsubscribe();

/* Close consumer */
consumer.close();
```

</TabItem>

<TabItem value="Go" label="Go">

```go
/* Unsubscribe */
_ = consumer.Unsubscribe()

/* Close consumer */
_ = consumer.Close()
```

</TabItem>

<TabItem value="Rust" label="Rust">

```rust
consumer.unsubscribe().await;
```

</TabItem>

<TabItem value="Python" label="Python">

```py
# Unsubscribe
consumer.unsubscribe()
# Close consumer
consumer.close()
```

</TabItem>
<TabItem label="Node.JS" value="Node.JS">

```js
consumer.unsubscribe();
consumer.close();
```

</TabItem>

<TabItem value="C#" label="C#">

```csharp
// Unsubscribe
consumer.Unsubscribe();

// Close consumer
consumer.Close();
```

</TabItem>

</Tabs>

## Delete a Topic

You can delete topics that are no longer useful. Note that you must unsubscribe all consumers from a topic before deleting it.

```sql
/* Delete topic/
DROP TOPIC topic_name;
```

## Check Status

1. Query all existing topics.

```sql
SHOW TOPICS;
```

2. Query the status and subscribed topics of all consumers.

```sql
SHOW CONSUMERS;
```

3. Query the relationships between consumers and vgroups.

```sql
SHOW SUBSCRIPTIONS;
```

## Examples

The following section shows sample code in various languages.

<Tabs defaultValue="java" groupId="lang">

<TabItem label="C" value="c">
  <CDemo />
</TabItem>

<TabItem label="Java" value="java">
<Tabs defaultValue="native">
<TabItem value="native" label="native connection">
<Java />
</TabItem>
<TabItem value="ws" label="WebSocket connection">
<JavaWS />
</TabItem>
</Tabs>
</TabItem>

<TabItem label="Go" value="Go">
   <Go/>
</TabItem>

<TabItem label="Rust" value="Rust">
    <Rust />
</TabItem>

<TabItem label="Python" value="Python">
    <Python />
</TabItem>

<TabItem label="Node.JS" value="Node.JS">
   <Node/>
</TabItem>

<TabItem label="C#" value="C#">
   <CSharp/>
</TabItem>

</Tabs>
